use std::collections::BTreeMap;

use anyhow::Context;
use application::{
    log_streaming::LogSinkWithId,
    Application,
};
use axum::{
    extract::FromRef,
    response::IntoResponse,
};
use common::{
    http::{
        extract::{
            Json,
            MtState,
            Path,
        },
        HttpResponseError,
    },
    knobs::AXIOM_MAX_ATTRIBUTES,
    log_streaming::LogEventFormatVersion,
};
use errors::ErrorMetadata;
use http::StatusCode;
use model::log_sinks::types::{
    axiom::{
        AxiomAttribute,
        AxiomConfig,
        VALID_AXIOM_INGEST_URLS,
    },
    datadog::{
        DatadogConfig,
        DatadogSiteLocation,
    },
    sentry::{
        ExceptionFormatVersion,
        SentryConfig,
        SerializedSentryConfig,
    },
    webhook::{
        generate_webhook_hmac_secret,
        WebhookConfig,
        WebhookFormat,
    },
    SinkConfig,
    SinkType,
};
use runtime::prod::ProdRuntime;
use sentry::types::Dsn;
use serde::{
    Deserialize,
    Serialize,
};
use utoipa::ToSchema;
use utoipa_axum::router::OpenApiRouter;
use value::{
    FieldName,
    ResolvedDocumentId,
};

use crate::{
    admin::{
        must_be_admin,
        must_be_admin_with_write_access,
    },
    authentication::ExtractIdentity,
    LocalAppState,
};

fn validate_axiom_ingest_url(ingest_url: Option<&String>) -> anyhow::Result<()> {
    if let Some(url) = ingest_url
        && !VALID_AXIOM_INGEST_URLS.contains(&url.as_str())
    {
        anyhow::bail!(ErrorMetadata::bad_request(
            "InvalidAxiomIngestUrl",
            format!(
                "Invalid Axiom ingest URL: {url}. Must be one of: {}",
                VALID_AXIOM_INGEST_URLS.join(", ")
            ),
        ));
    }
    Ok(())
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DatadogSinkPostArgs {
    site_location: DatadogSiteLocation,
    dd_api_key: String,
    dd_tags: Vec<String>,
    version: Option<String>,
    service: Option<String>,
}

impl TryFrom<DatadogSinkPostArgs> for DatadogConfig {
    type Error = anyhow::Error;

    fn try_from(value: DatadogSinkPostArgs) -> Result<Self, Self::Error> {
        Ok(Self {
            site_location: value.site_location,
            dd_api_key: value.dd_api_key.into(),
            dd_tags: value.dd_tags,
            version: match value.version {
                Some(v) => v.parse().context(ErrorMetadata::bad_request(
                    "InvalidLogStreamVersion",
                    format!("Invalid log stream version {v}"),
                ))?,
                None => LogEventFormatVersion::V1,
            },
            service: value.service,
        })
    }
}

pub async fn add_datadog_sink(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(args): Json<DatadogSinkPostArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    let config: DatadogConfig = args.try_into()?;
    st.application
        .add_log_sink(SinkConfig::Datadog(config))
        .await?;
    Ok(StatusCode::OK)
}

pub async fn regenerate_webhook_secret(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    let Some(SinkConfig::Webhook(existing_webhook_sink)) =
        st.application.get_log_sink(&SinkType::Webhook).await?
    else {
        return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "NoWebhookLogStream",
            "No webhook log stream exists for this deployment"
        ))
        .into());
    };

    let hmac_secret = generate_webhook_hmac_secret(st.application.runtime());

    let config = WebhookConfig {
        url: existing_webhook_sink.url,
        format: existing_webhook_sink.format,
        hmac_secret,
    };
    st.application
        .add_log_sink(SinkConfig::Webhook(config))
        .await?;

    Ok(StatusCode::OK)
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WebhookSinkPostArgs {
    url: String,
    format: WebhookFormat,
}

pub async fn add_webhook_sink(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(args): Json<WebhookSinkPostArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    add_webhook_sink_inner(st.application, args).await?;

    Ok(StatusCode::OK)
}

async fn add_webhook_sink_inner(
    application: Application<ProdRuntime>,
    args: WebhookSinkPostArgs,
) -> Result<(ResolvedDocumentId, String), HttpResponseError> {
    let existing_webhook_sink = application.get_log_sink(&SinkType::Webhook).await?;

    let hmac_secret = match existing_webhook_sink {
        Some(SinkConfig::Webhook(WebhookConfig {
            hmac_secret: existing_secret,
            ..
        })) => existing_secret,
        _ => generate_webhook_hmac_secret(application.runtime()),
    };

    let url = args.url.parse().map_err(|_| {
        anyhow::anyhow!(ErrorMetadata::bad_request(
            "InvalidWebhookUrl",
            "The URL passed was invalid"
        ))
    })?;

    let config = WebhookConfig {
        url,
        format: args.format,
        hmac_secret: hmac_secret.clone(),
    };
    let id = application
        .add_log_sink(SinkConfig::Webhook(config))
        .await?;

    Ok((id, hmac_secret))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AxiomSinkPostArgs {
    api_key: String,
    dataset_name: String,
    attributes: Vec<AxiomAttribute>,
    version: Option<String>,
    ingest_url: Option<String>,
}

impl TryFrom<AxiomSinkPostArgs> for AxiomConfig {
    type Error = anyhow::Error;

    fn try_from(value: AxiomSinkPostArgs) -> Result<Self, Self::Error> {
        validate_axiom_ingest_url(value.ingest_url.as_ref())?;

        Ok(Self {
            api_key: value.api_key.into(),
            dataset_name: value.dataset_name,
            attributes: value.attributes,
            version: match value.version {
                Some(v) => v.parse().context(ErrorMetadata::bad_request(
                    "InvalidLogStreamVersion",
                    format!("Invalid log stream version {v}"),
                ))?,
                None => LogEventFormatVersion::V1,
            },
            ingest_url: value.ingest_url,
        })
    }
}

pub async fn add_axiom_sink(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(args): Json<AxiomSinkPostArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    add_axiom_sink_inner(st.application, args).await?;

    Ok(StatusCode::OK)
}

async fn add_axiom_sink_inner(
    application: Application<ProdRuntime>,
    args: AxiomSinkPostArgs,
) -> Result<ResolvedDocumentId, HttpResponseError> {
    if args.attributes.len() > *AXIOM_MAX_ATTRIBUTES {
        return Err(anyhow::anyhow!(
            "Exceeded max number of Axiom attributes. Contact support@convex.dev to request a \
             limit increase."
        )
        .into());
    }

    let config: AxiomConfig = args.try_into()?;
    let id = application.add_log_sink(SinkConfig::Axiom(config)).await?;

    Ok(id)
}

pub async fn add_sentry_sink(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(args): Json<SerializedSentryConfig>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;
    st.application
        .add_log_sink(SinkConfig::Sentry(args.try_into()?))
        .await?;
    Ok(StatusCode::OK)
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LogSinkDeleteArgs {
    sink_type: SinkType,
}

pub async fn delete_log_sink(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(LogSinkDeleteArgs { sink_type }): Json<LogSinkDeleteArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    st.application.remove_log_sink(sink_type).await?;
    Ok(StatusCode::OK)
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
enum LogStreamType {
    Datadog,
    Webhook,
    Axiom,
    Sentry,
}

impl From<LogStreamType> for SinkType {
    fn from(log_stream_type: LogStreamType) -> Self {
        match log_stream_type {
            LogStreamType::Datadog => SinkType::Datadog,
            LogStreamType::Webhook => SinkType::Webhook,
            LogStreamType::Axiom => SinkType::Axiom,
            LogStreamType::Sentry => SinkType::Sentry,
        }
    }
}

/// Delete log stream
///
/// Delete the deployment's log stream with the given id.
#[utoipa::path(
    post,
    path = "/delete_log_stream/{id}",
    responses((status = 200)),
    params(
        ("id" = String, Path, description = "id of the log stream to delete"),
    ),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn delete_log_stream(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Path(id): Path<String>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    st.application.remove_log_sink_by_id(id).await?;
    Ok(StatusCode::OK)
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateDatadogLogStreamArgs {
    /// Location of your Datadog deployment.
    site_location: DatadogSiteLocation,
    /// Datadog API key for authentication.
    dd_api_key: String,
    /// Optional comma-separated list of tags. These are sent to Datadog in each
    /// log event via the `ddtags` field.
    dd_tags: Vec<String>,
    /// Service name used as a special tag in Datadog.
    service: Option<String>,
}

impl TryFrom<CreateDatadogLogStreamArgs> for DatadogConfig {
    type Error = anyhow::Error;

    fn try_from(value: CreateDatadogLogStreamArgs) -> Result<Self, Self::Error> {
        Ok(Self {
            site_location: value.site_location,
            dd_api_key: value.dd_api_key.into(),
            dd_tags: value.dd_tags,
            version: LogEventFormatVersion::V2,
            service: value.service,
        })
    }
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateWebhookLogStreamArgs {
    /// URL to send logs to.
    url: String,
    /// Format for the webhook payload. JSONL sends one object per line of
    /// request, JSON sends one array per request.
    format: WebhookFormat,
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateAxiomLogStreamArgs {
    /// Axiom API key for authentication.
    api_key: String,
    /// Name of the dataset in Axiom. This is where the logs will be sent.
    dataset_name: String,
    /// Optional list of attributes. These are extra fields and values sent to
    /// Axiom in each log event.
    attributes: Vec<AxiomAttribute>,
    /// Optional ingest endpoint for Axiom
    ingest_url: Option<String>,
}

impl TryFrom<CreateAxiomLogStreamArgs> for AxiomConfig {
    type Error = anyhow::Error;

    fn try_from(value: CreateAxiomLogStreamArgs) -> Result<Self, Self::Error> {
        validate_axiom_ingest_url(value.ingest_url.as_ref())?;

        Ok(Self {
            api_key: value.api_key.into(),
            dataset_name: value.dataset_name,
            attributes: value.attributes,
            version: LogEventFormatVersion::V2,
            ingest_url: value.ingest_url,
        })
    }
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateSentryLogStreamArgs {
    /// Sentry Data Source Name (DSN) to route exceptions to.
    dsn: String,
    /// Tags to add to all events routed to Sentry.
    #[schema(value_type = Option<BTreeMap<String, String>>)]
    tags: Option<BTreeMap<FieldName, String>>,
}

impl TryFrom<CreateSentryLogStreamArgs> for SentryConfig {
    type Error = anyhow::Error;

    fn try_from(value: CreateSentryLogStreamArgs) -> Result<Self, Self::Error> {
        Ok(Self {
            dsn: value
                .dsn
                .parse::<Dsn>()
                .context(ErrorMetadata::bad_request(
                    "InvalidSentryDsn",
                    "The Sentry DSN passed was invalid",
                ))?
                .into(),
            tags: value.tags,
            version: ExceptionFormatVersion::V2,
        })
    }
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
pub enum CreateLogStreamArgs {
    #[schema(title = "Datadog")]
    Datadog(CreateDatadogLogStreamArgs),
    #[schema(title = "Webhook")]
    Webhook(CreateWebhookLogStreamArgs),
    #[schema(title = "Axiom")]
    Axiom(CreateAxiomLogStreamArgs),
    #[schema(title = "Sentry")]
    Sentry(CreateSentryLogStreamArgs),
}

#[derive(Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateWebhookLogStreamResponse {
    id: String,
    /// Use this secret to verify webhook signatures.
    hmac_secret: String,
}

#[derive(Serialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
pub enum CreateLogStreamResponse {
    #[schema(title = "Webhook")]
    Webhook(CreateWebhookLogStreamResponse),
    #[schema(title = "Datadog")]
    Datadog { id: String },
    #[schema(title = "Axiom")]
    Axiom { id: String },
    #[schema(title = "Sentry")]
    Sentry { id: String },
}

async fn ensure_log_sink_does_not_exist(
    application: &Application<ProdRuntime>,
    sink_type: &SinkType,
) -> Result<(), HttpResponseError> {
    if application.get_log_sink(sink_type).await?.is_some() {
        return Err(anyhow::anyhow!(ErrorMetadata::conflict(
            "LogStreamAlreadyExists",
            format!("{sink_type:?} log stream already exists for this deployment",)
        ))
        .into());
    }
    Ok(())
}

/// Create log stream
///
/// Create a new log stream for the deployment. Errors if a log stream of the
/// given type already exists.
#[utoipa::path(
    post,
    path = "/create_log_stream",
    responses((status = 200, body = CreateLogStreamResponse)),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn create_log_stream(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Json(args): Json<CreateLogStreamArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    match args {
        CreateLogStreamArgs::Datadog(datadog_sink_post_args) => {
            ensure_log_sink_does_not_exist(&st.application, &SinkType::Datadog).await?;

            let config: DatadogConfig = datadog_sink_post_args.try_into()?;
            let id = st
                .application
                .add_log_sink(SinkConfig::Datadog(config))
                .await?;
            Ok(Json(CreateLogStreamResponse::Datadog {
                id: id.to_string(),
            }))
        },
        CreateLogStreamArgs::Webhook(webhook_sink_post_args) => {
            ensure_log_sink_does_not_exist(&st.application, &SinkType::Webhook).await?;

            let hmac_secret = generate_webhook_hmac_secret(st.application.runtime());

            let url = webhook_sink_post_args.url.parse().map_err(|_| {
                anyhow::anyhow!(ErrorMetadata::bad_request(
                    "InvalidWebhookUrl",
                    "The URL passed was invalid"
                ))
            })?;

            let config = WebhookConfig {
                url,
                format: webhook_sink_post_args.format,
                hmac_secret: hmac_secret.clone(),
            };
            let id = st
                .application
                .add_log_sink(SinkConfig::Webhook(config))
                .await?;

            Ok(Json(CreateLogStreamResponse::Webhook(
                CreateWebhookLogStreamResponse {
                    hmac_secret,
                    id: id.to_string(),
                },
            )))
        },
        CreateLogStreamArgs::Axiom(axiom_sink_post_args) => {
            ensure_log_sink_does_not_exist(&st.application, &SinkType::Axiom).await?;

            if axiom_sink_post_args.attributes.len() > *AXIOM_MAX_ATTRIBUTES {
                return Err(anyhow::anyhow!(
                    "Exceeded max number of Axiom attributes. Contact support@convex.dev to \
                     request a limit increase."
                )
                .into());
            }

            let config: AxiomConfig = axiom_sink_post_args.try_into()?;
            let id = st
                .application
                .add_log_sink(SinkConfig::Axiom(config))
                .await?;

            Ok(Json(CreateLogStreamResponse::Axiom { id: id.to_string() }))
        },
        CreateLogStreamArgs::Sentry(sentry_config_args) => {
            ensure_log_sink_does_not_exist(&st.application, &SinkType::Sentry).await?;

            let config = sentry_config_args.try_into()?;
            let id = st
                .application
                .add_log_sink(SinkConfig::Sentry(config))
                .await?;
            Ok(Json(CreateLogStreamResponse::Sentry { id: id.to_string() }))
        },
    }
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
pub enum RotateLogStreamSecretArgs {
    Webhook,
}

#[derive(Serialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
pub enum RotateLogStreamSecretResponse {
    #[serde(rename_all = "camelCase")]
    #[schema(title = "Webhook")]
    Webhook { hmac_secret: String },
}

/// Rotate webhook log stream secret
///
/// Rotate the secret for the webhook log stream.
#[utoipa::path(
    post,
    path = "/rotate_webhook_secret/{id}",
    responses((status = 200, body = RotateLogStreamSecretResponse)),
    params(
        ("id" = String, Path, description = "id of the webhook log stream for which to rotate the secret"),
    ),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn rotate_webhook_secret(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Path(id): Path<String>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    let Some(LogSinkWithId {
        config: sink_config,
        ..
    }) = st.application.get_log_sink_by_id(&id).await?
    else {
        return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "LogStreamDoesntExist",
            "No log stream with the given id exists for this deployment",
        ))
        .into());
    };

    match sink_config {
        SinkConfig::Webhook(existing_webhook_sink) => {
            let hmac_secret = generate_webhook_hmac_secret(st.application.runtime());

            let config = WebhookConfig {
                url: existing_webhook_sink.url,
                format: existing_webhook_sink.format,
                hmac_secret: hmac_secret.clone(),
            };
            st.application
                .patch_log_sink_config(&id, SinkConfig::Webhook(config))
                .await?;

            Ok(Json(RotateLogStreamSecretResponse::Webhook { hmac_secret }))
        },
        _ => Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "NoSecretToRotate",
            "This log stream does not have a secret to rotate."
        ))
        .into()),
    }
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
enum LogStreamConfig {
    #[schema(title = "Datadog")]
    Datadog(DatadogLogStreamConfig),
    #[schema(title = "Webhook")]
    Webhook(WebhookLogStreamConfig),
    #[schema(title = "Axiom")]
    Axiom(AxiomLogStreamConfig),
    #[schema(title = "Sentry")]
    Sentry(SentryLogStreamConfig),
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(title = "DatadogConfig")]
pub struct DatadogLogStreamConfig {
    pub id: String,
    /// Location of your Datadog deployment.
    pub site_location: DatadogSiteLocation,
    /// Optional comma-separated list of tags. These are sent to Datadog in each
    /// log event via the `ddtags` field.
    pub dd_tags: Vec<String>,
    /// Service name used as a special tag in Datadog.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub service: Option<String>,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(title = "WebhookConfig")]
pub struct WebhookLogStreamConfig {
    pub id: String,
    /// URL to send logs to.
    pub url: String,
    /// Format for the webhook payload. JSONL sends one object per line of
    /// request, JSON sends one array per request.
    pub format: WebhookFormat,
    /// Use this secret to verify webhook signatures.
    pub hmac_secret: String,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(title = "AxiomConfig")]
pub struct AxiomLogStreamConfig {
    pub id: String,
    /// Name of the dataset in Axiom. This is where the logs will be sent.
    pub dataset_name: String,
    /// Optional list of attributes. These are extra fields and values sent to
    /// Axiom in each log event.
    pub attributes: Vec<AxiomAttribute>,
    /// Optional ingest endpoint for Axiom
    #[serde(skip_serializing_if = "Option::is_none")]
    pub ingest_url: Option<String>,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(title = "SentryConfig")]
pub struct SentryLogStreamConfig {
    pub id: String,
    /// Tags to add to all events routed to Sentry.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tags: Option<BTreeMap<String, String>>,
}

fn log_sink_to_log_stream_config(sink: LogSinkWithId) -> Option<LogStreamConfig> {
    match sink.config {
        SinkConfig::Datadog(config) => Some(LogStreamConfig::Datadog(DatadogLogStreamConfig {
            id: sink.id.to_string(),
            site_location: config.site_location,
            dd_tags: config.dd_tags,
            service: config.service,
        })),
        SinkConfig::Webhook(config) => Some(LogStreamConfig::Webhook(WebhookLogStreamConfig {
            id: sink.id.to_string(),
            url: config.url.to_string(),
            format: config.format,
            hmac_secret: config.hmac_secret,
        })),
        SinkConfig::Axiom(config) => Some(LogStreamConfig::Axiom(AxiomLogStreamConfig {
            id: sink.id.to_string(),
            dataset_name: config.dataset_name,
            attributes: config.attributes,
            ingest_url: config.ingest_url,
        })),
        SinkConfig::Sentry(config) => Some(LogStreamConfig::Sentry(SentryLogStreamConfig {
            id: sink.id.to_string(),
            tags: config
                .tags
                .map(|tags| tags.into_iter().map(|(k, v)| (k.into(), v)).collect()),
        })),
        _ => None,
    }
}

/// List log streams
///
/// List configs for all existing log streams in a deployment.
#[utoipa::path(
    get,
    path = "/list_log_streams",
    responses((status = 200, body = Vec<LogStreamConfig>)),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn list_log_streams(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin(&identity)?;

    Ok(Json(
        st.application
            .list_log_sinks()
            .await?
            .into_iter()
            .filter_map(log_sink_to_log_stream_config)
            .collect::<Vec<LogStreamConfig>>(),
    ))
}

/// Get log stream
///
/// Get the config for a specific log stream by id.
#[utoipa::path(
    get,
    path = "/get_log_stream/{id}",
    responses((status = 200, body = LogStreamConfig)),
    params(
        ("id" = String, Path, description = "id of the log stream to retrieve"),
    ),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn get_log_stream(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Path(id): Path<String>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin(&identity)?;

    let Some(log_sink_with_id) = st.application.get_log_sink_by_id(&id).await? else {
        return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "LogStreamDoesntExist",
            "No log stream with the given id exists for this deployment",
        ))
        .into());
    };

    let config = log_sink_to_log_stream_config(log_sink_with_id).ok_or_else(|| {
        anyhow::anyhow!(ErrorMetadata::bad_request(
            "UnsupportedLogStreamType",
            "This log stream type is not supported",
        ))
    })?;

    Ok(Json(config))
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateDatadogSinkArgs {
    /// Location of your Datadog deployment.
    #[serde(default)]
    site_location: Option<DatadogSiteLocation>,
    /// Datadog API key for authentication.
    #[serde(default)]
    dd_api_key: Option<String>,
    /// Optional comma-separated list of tags. These are sent to Datadog in each
    /// log event via the `ddtags` field.
    #[serde(default)]
    dd_tags: Option<Vec<String>>,
    /// Service name used as a special tag in Datadog.
    #[serde(default, with = "::serde_with::rust::double_option")]
    service: Option<Option<String>>,
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateWebhookSinkArgs {
    /// URL to send logs to.
    #[serde(default)]
    url: Option<String>,
    /// Format for the webhook payload. JSONL sends one object per line of
    /// request, JSON sends one array per request.
    #[serde(default)]
    format: Option<WebhookFormat>,
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateAxiomSinkArgs {
    /// Axiom API key for authentication.
    #[serde(default)]
    api_key: Option<String>,
    /// Name of the dataset in Axiom. This is where the logs will be sent.
    #[serde(default)]
    dataset_name: Option<String>,
    /// Optional list of attributes. These are extra fields and values sent to
    /// Axiom in each log event.
    #[serde(default)]
    attributes: Option<Vec<AxiomAttribute>>,
    /// Optional ingest endpoint for Axiom
    #[serde(default, with = "::serde_with::rust::double_option")]
    ingest_url: Option<Option<String>>,
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateSentrySinkArgs {
    /// Sentry Data Source Name (DSN) to route exceptions to.
    #[serde(default)]
    dsn: Option<String>,
    /// Tags to add to all events routed to Sentry.
    #[serde(default, with = "::serde_with::rust::double_option")]
    #[schema(value_type = Option<Option<BTreeMap<String, String>>>)]
    tags: Option<Option<BTreeMap<FieldName, String>>>,
}

#[derive(Deserialize, ToSchema)]
#[serde(rename_all = "camelCase", tag = "logStreamType")]
pub enum UpdateLogStreamArgs {
    #[schema(title = "Datadog")]
    Datadog(UpdateDatadogSinkArgs),
    #[schema(title = "Webhook")]
    Webhook(UpdateWebhookSinkArgs),
    #[schema(title = "Axiom")]
    Axiom(UpdateAxiomSinkArgs),
    #[schema(title = "Sentry")]
    Sentry(UpdateSentrySinkArgs),
}

/// Update log stream
///
/// Update an existing log stream for the deployment. Omit a field to keep the
/// existing value, and use `null` to unset a field.
#[utoipa::path(
    post,
    path = "/update_log_stream/{id}",
    responses((status = 200)),
    params(
        ("id" = String, Path, description = "id of the log stream to update"),
    ),
    security(
        ("Deploy Key" = []),
        ("OAuth Team Token" = []),
        ("Team Token" = []),
        ("OAuth Project Token" = []),
    ),
)]
pub async fn update_log_stream(
    MtState(st): MtState<LocalAppState>,
    ExtractIdentity(identity): ExtractIdentity,
    Path(id): Path<String>,
    Json(args): Json<UpdateLogStreamArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
    must_be_admin_with_write_access(&identity)?;
    st.application
        .ensure_log_streaming_allowed(identity)
        .await?;

    let Some(LogSinkWithId {
        config: sink_config,
        ..
    }) = st.application.get_log_sink_by_id(&id).await?
    else {
        return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "LogStreamDoesntExist",
            "No log stream with the given id exists for this deployment",
        ))
        .into());
    };

    match sink_config {
        SinkConfig::Datadog(existing_config) => {
            let UpdateLogStreamArgs::Datadog(update_args) = args else {
                return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
                    "LogStreamTypeMismatch",
                    "Cannot update a Datadog log stream with arguments for a different log stream \
                     type",
                ))
                .into());
            };

            let config = DatadogConfig {
                site_location: update_args
                    .site_location
                    .unwrap_or(existing_config.site_location),
                dd_api_key: update_args
                    .dd_api_key
                    .map(|k| k.into())
                    .unwrap_or(existing_config.dd_api_key),
                dd_tags: update_args.dd_tags.unwrap_or(existing_config.dd_tags),
                version: existing_config.version,
                service: update_args.service.unwrap_or(existing_config.service),
            };

            st.application
                .patch_log_sink_config(&id, SinkConfig::Datadog(config))
                .await?;
            Ok(StatusCode::OK)
        },
        SinkConfig::Webhook(existing_config) => {
            let UpdateLogStreamArgs::Webhook(update_args) = args else {
                return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
                    "LogStreamTypeMismatch",
                    "Cannot update a Webhook log stream with arguments for a different log stream \
                     type",
                ))
                .into());
            };

            let url = if let Some(url_str) = update_args.url {
                url_str.parse().map_err(|_| {
                    anyhow::anyhow!(ErrorMetadata::bad_request(
                        "InvalidWebhookUrl",
                        "The URL passed was invalid"
                    ))
                })?
            } else {
                existing_config.url
            };

            let config = WebhookConfig {
                url,
                format: update_args.format.unwrap_or(existing_config.format),
                hmac_secret: existing_config.hmac_secret,
            };

            st.application
                .patch_log_sink_config(&id, SinkConfig::Webhook(config))
                .await?;
            Ok(StatusCode::OK)
        },
        SinkConfig::Axiom(existing_config) => {
            let UpdateLogStreamArgs::Axiom(update_args) = args else {
                return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
                    "LogStreamTypeMismatch",
                    "Cannot update an Axiom log stream with arguments for a different log stream \
                     type",
                ))
                .into());
            };

            let attributes = update_args.attributes.unwrap_or(existing_config.attributes);
            if attributes.len() > *AXIOM_MAX_ATTRIBUTES {
                return Err(anyhow::anyhow!(
                    "Exceeded max number of Axiom attributes. Contact support@convex.dev to \
                     request a limit increase."
                )
                .into());
            }

            let ingest_url = update_args.ingest_url.unwrap_or(existing_config.ingest_url);
            if ingest_url.is_some() {
                validate_axiom_ingest_url(ingest_url.as_ref())?
            }

            let config = AxiomConfig {
                api_key: update_args
                    .api_key
                    .map(|k| k.into())
                    .unwrap_or(existing_config.api_key),
                dataset_name: update_args
                    .dataset_name
                    .unwrap_or(existing_config.dataset_name),
                attributes,
                version: existing_config.version,
                ingest_url,
            };

            st.application
                .patch_log_sink_config(&id, SinkConfig::Axiom(config))
                .await?;
            Ok(StatusCode::OK)
        },
        SinkConfig::Sentry(existing_config) => {
            let UpdateLogStreamArgs::Sentry(update_args) = args else {
                return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
                    "LogStreamTypeMismatch",
                    "Cannot update a Sentry log stream with arguments for a different log stream \
                     type",
                ))
                .into());
            };

            let dsn = if let Some(dsn_str) = update_args.dsn {
                dsn_str
                    .parse::<Dsn>()
                    .context(ErrorMetadata::bad_request(
                        "InvalidSentryDsn",
                        "The Sentry DSN passed was invalid",
                    ))?
                    .into()
            } else {
                existing_config.dsn
            };

            let config = SentryConfig {
                dsn,
                tags: update_args.tags.unwrap_or(existing_config.tags),
                version: existing_config.version,
            };

            st.application
                .patch_log_sink_config(&id, SinkConfig::Sentry(config))
                .await?;
            Ok(StatusCode::OK)
        },
        _ => Err(anyhow::anyhow!(ErrorMetadata::bad_request(
            "UnsupportedLogStreamType",
            "This log stream type does not support updates",
        ))
        .into()),
    }
}

pub fn platform_router<S>() -> OpenApiRouter<S>
where
    LocalAppState: FromRef<S>,
    S: Clone + Send + Sync + 'static,
{
    OpenApiRouter::new()
        .routes(utoipa_axum::routes!(list_log_streams))
        .routes(utoipa_axum::routes!(get_log_stream))
        .routes(utoipa_axum::routes!(delete_log_stream))
        .routes(utoipa_axum::routes!(create_log_stream))
        .routes(utoipa_axum::routes!(update_log_stream))
        .routes(utoipa_axum::routes!(rotate_webhook_secret))
}

#[cfg(test)]
mod tests {
    use model::log_sinks::types::{
        axiom::AxiomConfig,
        datadog::{
            DatadogConfig,
            DatadogSiteLocation,
        },
        LogSinksRow,
    };
    use serde_json::json;

    use crate::log_sinks::{
        AxiomSinkPostArgs,
        DatadogSinkPostArgs,
    };

    #[test]
    fn datadog_config_deserialize() -> anyhow::Result<()> {
        // Basic deserialize
        let json = json!({
            "siteLocation": "US1",
            "ddApiKey": "test_key",
            "ddTags": vec!["tag:abc","abc"],
        });
        let post_args: DatadogSinkPostArgs = serde_json::from_value(json)?;
        let config = DatadogConfig::try_from(post_args)?;
        assert_eq!(config.site_location, DatadogSiteLocation::US1);
        assert_eq!(config.dd_api_key, "test_key".to_string().into());
        assert_eq!(
            config.dd_tags,
            vec!["tag:abc".to_string(), "abc".to_string()]
        );

        // No tags
        let json = json!({
            "siteLocation": "US1",
            "ddApiKey": "test_key",
            "ddTags": Vec::<String>::new()
        });
        let post_args: DatadogSinkPostArgs = serde_json::from_value(json)?;
        let config = DatadogConfig::try_from(post_args)?;
        assert_eq!(config.site_location, DatadogSiteLocation::US1);
        assert_eq!(config.dd_api_key, "test_key".to_string().into());
        assert!(config.dd_tags.is_empty());

        // US1_FED -- ensure we handle the SCREAMING_SNAKE_CASE
        let json = json!({
            "siteLocation": "US1_FED",
            "ddApiKey": "test_key",
            "ddTags": Vec::<String>::new()
        });
        let post_args: DatadogSinkPostArgs = serde_json::from_value(json)?;
        let config = DatadogConfig::try_from(post_args)?;
        assert_eq!(config.site_location, DatadogSiteLocation::US1_FED);
        assert_eq!(config.dd_api_key, "test_key".to_string().into());
        assert!(config.dd_tags.is_empty());
        Ok(())
    }

    #[test]
    fn axiom_config_valid_ingest_urls() -> anyhow::Result<()> {
        // Test with no ingest_url (default)
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json)?;
        let config = AxiomConfig::try_from(post_args)?;
        assert!(config.ingest_url.is_none());

        // Test with default URL
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
            "ingestUrl": "https://api.axiom.co",
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json)?;
        let config = AxiomConfig::try_from(post_args)?;
        assert_eq!(config.ingest_url, Some("https://api.axiom.co".to_string()));

        // Test with US East 1
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
            "ingestUrl": "https://us-east-1.aws.edge.axiom.co",
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json)?;
        let config = AxiomConfig::try_from(post_args)?;
        assert_eq!(
            config.ingest_url,
            Some("https://us-east-1.aws.edge.axiom.co".to_string())
        );

        // Test with EU Central 1
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
            "ingestUrl": "https://eu-central-1.aws.edge.axiom.co",
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json)?;
        let config = AxiomConfig::try_from(post_args)?;
        assert_eq!(
            config.ingest_url,
            Some("https://eu-central-1.aws.edge.axiom.co".to_string())
        );

        Ok(())
    }

    #[test]
    fn axiom_config_invalid_ingest_url() {
        // Test with invalid URL
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
            "ingestUrl": "https://invalid.axiom.co",
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json).unwrap();
        let result = AxiomConfig::try_from(post_args);
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert!(err.to_string().contains("Invalid Axiom ingest URL"));
        assert!(err.to_string().contains("https://invalid.axiom.co"));

        // Test with completely wrong URL
        let json = json!({
            "apiKey": "test_key",
            "datasetName": "test_dataset",
            "attributes": [],
            "ingestUrl": "https://example.com",
        });
        let post_args: AxiomSinkPostArgs = serde_json::from_value(json).unwrap();
        let result = AxiomConfig::try_from(post_args);
        assert!(result.is_err());
    }

    // Endpoint tests
    use axum::body::Body;
    use axum_extra::headers::authorization::Credentials;
    use common::{
        document::{
            ParseDocument,
            ParsedDocument,
        },
        log_streaming::LogEventFormatVersion,
    };
    use http::{
        Request,
        StatusCode,
    };
    use keybroker::Identity;
    use model::log_sinks::types::{
        sentry::ExceptionFormatVersion,
        SinkConfig,
    };
    use runtime::prod::ProdRuntime;
    use serde_json::Value as JsonValue;

    use crate::test_helpers::{
        setup_backend_for_test,
        TestLocalBackend,
    };

    async fn create_log_stream(
        backend: &TestLocalBackend,
        body: JsonValue,
    ) -> anyhow::Result<JsonValue> {
        let req = Request::builder()
            .uri("/api/v1/create_log_stream")
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&body)?))?;
        backend.expect_success(req).await
    }

    async fn list_log_streams(backend: &TestLocalBackend) -> anyhow::Result<Vec<JsonValue>> {
        let req = Request::builder()
            .uri("/api/v1/list_log_streams")
            .method("GET")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;
        backend.expect_success(req).await
    }

    async fn get_log_stream(backend: &TestLocalBackend, id: &str) -> anyhow::Result<JsonValue> {
        let req = Request::builder()
            .uri(format!("/api/v1/get_log_stream/{id}"))
            .method("GET")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;
        backend.expect_success(req).await
    }

    async fn update_log_stream(
        backend: &TestLocalBackend,
        id: &str,
        body: JsonValue,
    ) -> anyhow::Result<()> {
        let req = Request::builder()
            .uri(format!("/api/v1/update_log_stream/{id}"))
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&body)?))?;
        backend.expect_success(req).await
    }

    async fn delete_log_stream(backend: &TestLocalBackend, id: &str) -> anyhow::Result<()> {
        let req = Request::builder()
            .uri(format!("/api/v1/delete_log_stream/{id}"))
            .method("POST")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;
        backend.expect_success(req).await
    }

    async fn rotate_webhook_secret(
        backend: &TestLocalBackend,
        id: &str,
    ) -> anyhow::Result<JsonValue> {
        let req = Request::builder()
            .uri(format!("/api/v1/rotate_webhook_secret/{id}"))
            .method("POST")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;
        backend.expect_success(req).await
    }

    async fn get_log_sink_from_db(
        backend: &TestLocalBackend,
        id: &str,
    ) -> anyhow::Result<Option<SinkConfig>> {
        let mut tx = backend.st.application.begin(Identity::system()).await?;
        let resolved_id = tx.resolve_developer_id(&id.parse()?, value::TableNamespace::Global)?;
        let doc = tx.get(resolved_id).await?;
        if let Some(doc) = doc {
            let row: ParsedDocument<LogSinksRow> = doc.parse()?;
            Ok(Some(row.config.clone()))
        } else {
            Ok(None)
        }
    }

    #[convex_macro::prod_rt_test]
    async fn test_list_log_streams_empty(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;
        let streams = list_log_streams(&backend).await?;
        assert_eq!(streams.len(), 0);
        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_list_log_streams(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        // Create a webhook log stream
        create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        // Create a datadog log stream
        create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": ["tag1", "tag2"],
                "service": "my-service",
            }),
        )
        .await?;

        let streams = list_log_streams(&backend).await?;
        assert_eq!(streams.len(), 2);

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_get_log_stream(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();
        let stream = get_log_stream(&backend, id).await?;

        assert_eq!(stream["logStreamType"], "webhook");
        assert_eq!(stream["url"], "https://example.com/webhook");
        assert_eq!(stream["format"], "jsonl");

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_get_log_stream_not_found(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/get_log_stream/invalid_id")
            .method("GET")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidLogStreamId")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_webhook(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        assert!(response["id"].is_string());
        assert!(response["hmacSecret"].is_string());
        assert_eq!(response["logStreamType"], "webhook");

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_datadog_defaults_to_v2(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": ["tag1"],
                "service": "my-service",
            }),
        )
        .await?;

        let id = response["id"].as_str().unwrap();

        // Check the version in the database
        let config = get_log_sink_from_db(&backend, id).await?;
        assert!(config.is_some());
        match config.unwrap() {
            SinkConfig::Datadog(config) => {
                assert_eq!(config.version, LogEventFormatVersion::V2);
            },
            _ => panic!("Expected Datadog config"),
        }

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_axiom_defaults_to_v2(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "axiom",
                "apiKey": "test_key",
                "datasetName": "my-dataset",
                "attributes": [],
            }),
        )
        .await?;

        let id = response["id"].as_str().unwrap();

        // Check the version in the database
        let config = get_log_sink_from_db(&backend, id).await?;
        assert!(config.is_some());
        match config.unwrap() {
            SinkConfig::Axiom(config) => {
                assert_eq!(config.version, LogEventFormatVersion::V2);
            },
            _ => panic!("Expected Axiom config"),
        }

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_sentry_defaults_to_v2(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "sentry",
                "dsn": "https://ef1d32d342354c87869ab2db8b490b2c@o1192621.ingest.sentry.io/6333191",
                "tags": null,
            }),
        )
        .await?;

        let id = response["id"].as_str().unwrap();

        // Check the version in the database
        let config = get_log_sink_from_db(&backend, id).await?;
        assert!(config.is_some());
        match config.unwrap() {
            SinkConfig::Sentry(config) => {
                assert_eq!(config.version, ExceptionFormatVersion::V2);
            },
            _ => panic!("Expected Sentry config"),
        }

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_already_exists(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let req = Request::builder()
            .uri("/api/v1/create_log_stream")
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook2",
                "format": "json",
            }))?))?;

        backend
            .expect_error(req, StatusCode::CONFLICT, "LogStreamAlreadyExists")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_invalid_webhook_url(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/create_log_stream")
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&json!({
                "logStreamType": "webhook",
                "url": "not-a-valid-url",
                "format": "jsonl",
            }))?))?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidWebhookUrl")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_create_log_stream_invalid_sentry_dsn(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/create_log_stream")
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&json!({
                "logStreamType": "sentry",
                "dsn": "not-a-valid-dsn",
                "tags": null,
            }))?))?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidSentryDsn")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_omit_field(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "original_key",
                "ddTags": ["tag1"],
                "service": "original-service",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        // Update only the service field
        update_log_stream(
            &backend,
            id,
            json!({
                "logStreamType": "datadog",
                "service": "updated-service",
            }),
        )
        .await?;

        let stream = get_log_stream(&backend, id).await?;
        assert_eq!(stream["siteLocation"], "US1");
        assert_eq!(stream["ddTags"], json!(["tag1"]));
        assert_eq!(stream["service"], "updated-service");

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_unset_optional_field(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": [],
                "service": "my-service",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        // Unset the service field
        update_log_stream(
            &backend,
            id,
            json!({
                "logStreamType": "datadog",
                "service": null,
            }),
        )
        .await?;

        let stream = get_log_stream(&backend, id).await?;
        assert!(stream.get("service").is_none());

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_not_found(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/update_log_stream/invalid_id")
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&json!({
                "logStreamType": "webhook",
                "url": "https://example.com/new",
            }))?))?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidLogStreamId")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_type_mismatch(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        let req = Request::builder()
            .uri(format!("/api/v1/update_log_stream/{id}"))
            .method("POST")
            .header("Content-Type", "application/json")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::from(serde_json::to_vec(&json!({
                "logStreamType": "datadog",
                "ddApiKey": "new_key",
            }))?))?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "LogStreamTypeMismatch")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_preserves_version(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": [],
                "service": null,
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        // Update the API key
        update_log_stream(
            &backend,
            id,
            json!({
                "logStreamType": "datadog",
                "ddApiKey": "new_key",
            }),
        )
        .await?;

        // Check the version is still V2
        let config = get_log_sink_from_db(&backend, id).await?;
        match config.unwrap() {
            SinkConfig::Datadog(config) => {
                assert_eq!(config.version, LogEventFormatVersion::V2);
            },
            _ => panic!("Expected Datadog config"),
        }

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_update_log_stream_webhook_preserves_hmac_secret(
        rt: ProdRuntime,
    ) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();
        let original_secret = create_response["hmacSecret"].as_str().unwrap();

        // Update the URL
        update_log_stream(
            &backend,
            id,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/new-webhook",
            }),
        )
        .await?;

        let stream = get_log_stream(&backend, id).await?;
        assert_eq!(stream["url"], "https://example.com/new-webhook");
        assert_eq!(stream["hmacSecret"], original_secret);

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_delete_log_stream(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        delete_log_stream(&backend, id).await?;

        // Verify it's deleted
        let req = Request::builder()
            .uri(format!("/api/v1/get_log_stream/{id}"))
            .method("GET")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "LogStreamDoesntExist")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_delete_log_stream_not_found(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/delete_log_stream/invalid_id")
            .method("POST")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidLogStreamId")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_delete_and_recreate_same_type(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();
        delete_log_stream(&backend, id).await?;

        // Create a new webhook log stream
        let new_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/new-webhook",
                "format": "json",
            }),
        )
        .await?;

        assert!(new_response["id"].is_string());
        assert_ne!(new_response["id"], id);

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_rotate_webhook_secret(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();
        let original_secret = create_response["hmacSecret"].as_str().unwrap();

        let rotate_response = rotate_webhook_secret(&backend, id).await?;
        let new_secret = rotate_response["hmacSecret"].as_str().unwrap();

        assert_ne!(new_secret, original_secret);

        // Verify the stream still has the same URL and format
        let stream = get_log_stream(&backend, id).await?;
        assert_eq!(stream["url"], "https://example.com/webhook");
        assert_eq!(stream["format"], "jsonl");
        assert_eq!(stream["hmacSecret"], new_secret);

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_rotate_secret_non_webhook_errors(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let create_response = create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": [],
                "service": null,
            }),
        )
        .await?;

        let id = create_response["id"].as_str().unwrap();

        let req = Request::builder()
            .uri(format!("/api/v1/rotate_webhook_secret/{id}"))
            .method("POST")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "NoSecretToRotate")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_rotate_secret_not_found(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        let req = Request::builder()
            .uri("/api/v1/rotate_webhook_secret/invalid_id")
            .method("POST")
            .header("Authorization", backend.admin_auth_header.0.encode())
            .body(Body::empty())?;

        backend
            .expect_error(req, StatusCode::BAD_REQUEST, "InvalidLogStreamId")
            .await?;

        Ok(())
    }

    #[convex_macro::prod_rt_test]
    async fn test_multiple_log_streams_different_types(rt: ProdRuntime) -> anyhow::Result<()> {
        let backend = setup_backend_for_test(rt).await?;

        create_log_stream(
            &backend,
            json!({
                "logStreamType": "webhook",
                "url": "https://example.com/webhook",
                "format": "jsonl",
            }),
        )
        .await?;

        create_log_stream(
            &backend,
            json!({
                "logStreamType": "datadog",
                "siteLocation": "US1",
                "ddApiKey": "test_key",
                "ddTags": [],
                "service": null,
            }),
        )
        .await?;

        create_log_stream(
            &backend,
            json!({
                "logStreamType": "axiom",
                "apiKey": "test_key",
                "datasetName": "my-dataset",
                "attributes": [],
            }),
        )
        .await?;

        let streams = list_log_streams(&backend).await?;
        assert_eq!(streams.len(), 3);

        Ok(())
    }
}
